package com.sea.bei

import java.util

import com.alibaba.fastjson.JSONObject
import com.ververica.cdc.connectors.mysql.MySqlSource
import com.ververica.cdc.connectors.mysql.table.StartupOptions
import com.ververica.cdc.debezium.{DebeziumDeserializationSchema, DebeziumSourceFunction, StringDebeziumDeserializationSchema}
import io.debezium.data.Envelope
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import org.apache.kafka.connect.data.{ Struct}
import org.apache.kafka.connect.source.SourceRecord

/**
  * Date  2021/11/9-12:39
  *
  * 自定义序列化
  *
  * {
  *   "db":"",
  *   "tablename":"",
  *   "before":{"id":"1","name":"xt"},
  *   "after":{"id":"2","name":"ww"},
  *   "optype":"操作类型"
  * }
  *
  *
  * 原数据：
  *
  * SourceRecord{sourcePartition={server=mysql_binlog_source},sourceOffset={ts_sec=1636434219, file=mysql-bin.000013, pos=11819}}
  * ConnectRecord{
  *   topic='mysql_binlog_source.flinkcdc.user_info',
  *   kafkaPartition=null,
  *   key=Struct{id=3},
  *   keySchema=Schema{mysql_binlog_source.flinkcdc.user_info.Key:STRUCT},
  *   value=Struct{
  *     after=Struct{id=3},
  *     source=Struct
  *       {
  *         version=1.5.2.Final,
  *         connector=mysql,
  *         name=mysql_binlog_source,
  *         ts_ms=1636434219734,
  *         snapshot=last,
  *         db=flinkcdc,
  *         table=user_info,
  *         server_id=0,
  *         file=mysql-bin.000013,
  *         pos=11819,
  *         row=0
  *       },
  *     op=r,
  *     ts_ms=1636434219738
  *   },
  *   valueSchema=Schema{
  *     mysql_binlog_source.flinkcdc.user_info.Envelope:STRUCT
  *   },
  *   timestamp=null,
  *   headers=ConnectHeaders(headers=)
  * }
  *
  *  注意：
  *    1：当在pom.xml文件中使用如下配置时：
  *     <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink-version}</version>
        </dependency>
  
    在读取mysql的修改数据时，flink收集到的数据为一条数据：其中有before（修改前的数据）、after（修改后的数据）
  
    但是在使用如下时
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink-version}</version>
        </dependency>
  
    在读取mysql的修改数据时，flink收集到的数据为两条数据：其中有前一条数据为操作类型为删除，后一条数据操作类型为插入
  
  *
  *
  * 二：数据输出
  *   1 读取数据库现有的数据
  *     {"before":{},"after":{"sex":"1","name":"11","id":"3"},"operationtype":"read","tablename":"user_info","db":"flinkcd
  *   2 修改数据
  *     {"before":{"sex":"1","name":"11","id":"3"},"after":{"sex":"2","name":"11","id":"3"},"operationtype":"update","tablename":"user_info","db":"flinkcdc"}
  *   3 删除数据
  *     {"before":{"sex":"11","name":"11","id":"66"},"after":{},"operationtype":"delete","tablename":"user_info","db":"flinkcdc"}
  *
  */
object FlinkCDCCustomerSchema  {
  
  def main(args: Array[String]): Unit = {
  
    // 1 创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    
  
    //3.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序
    val source: DebeziumSourceFunction[String] = MySqlSource.builder()
      .hostname("localhost")
      .port(3306)
      .username("root")
      .password("123456")
      .serverTimeZone("Asia/Shanghai") // 必须加上，要不然不能使用
      .databaseList("flinkcdc")//flinkcdc
      .tableList("flinkcdc.user_info")
      .deserializer(new MySchema())
      .startupOptions(StartupOptions.initial())
      .build()
  
    env.addSource(source).print()
  
    env.execute("FlinkCDC")
    
  }
  
}

class MySchema extends DebeziumDeserializationSchema[String]{
  
  override def deserialize(sourceRecord: SourceRecord, collector: Collector[String]): Unit = {
    
    val result = new JSONObject()
    
    // 库名与表名
    val topic: String = sourceRecord.topic()
    val dbAndTableName: Array[String] = topic.split("\\.")
    
    // 获取操作类型
    val operation: Envelope.Operation = Envelope.operationFor(sourceRecord)
    val operationtype: String = operation.toString.toLowerCase
    
    
    val value: Struct = sourceRecord.value().asInstanceOf[Struct]
    
    
    // 创建JSON对象，用于存储数据信息
    val after: Struct = value.getStruct("after").asInstanceOf[Struct]
    val afterJson = new JSONObject()
    if(after != null){
     
      import scala.collection.JavaConversions._
      for(ele <- after.schema().fields()){
        afterJson.put(ele.name(), after.get(ele))
      }
    }
   
    
    // 创建JSON对象，用于存储数据信息
    val before: Struct = value.getStruct("before").asInstanceOf[Struct]
    val beforeJson = new JSONObject()
    if(before != null){
      import scala.collection.JavaConversions._
      for(ele <- before.schema().fields()){
        beforeJson.put(ele.name(), before.get(ele))
      }
    }
   
  
  
    // 获取表名
    result.put("db",dbAndTableName(1))
    // 获取库名
    result.put("tablename",dbAndTableName(2))
  
    // before
    result.put("before", beforeJson)
    // after
    result.put("after", afterJson)
    
    result.put("operationtype", operationtype)
    
    collector.collect(result.toJSONString)
    
  }
  
  override def getProducedType: TypeInformation[String] = BasicTypeInfo.STRING_TYPE_INFO
  
  
}

